package com.jfinal.plugin.zbus.proto;

import com.google.common.collect.Maps;
import com.jfinal.log.Logger;
import com.jfinal.plugin.zbus.core.TMsgHandler;
import org.zbus.broker.Broker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Protocol;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.jfinal.plugin.zbus.proto.ZbusQueue.tMsgReceiverQMap;
import static com.jfinal.plugin.zbus.proto.ZbusTopic.tMsgReceiverTMap;

/**
 * zbus的消费者对象,主要包含操作:
 * (init,create,go)*(mq,topic),register
 * Created by dairymix on 17/5/3.
 */
public class ZbusConsumer {

    private static final Logger log = Logger.getLogger(ZbusConsumer.class);

    Broker broker;
    String mqName;
    String topic;

    //集群化实现
    public Map<Integer,Consumer> consumerMap= Maps.newConcurrentMap();
    Integer consumer_max_num;
    Boolean hasGlobalMaxNum;

    public ZbusConsumer broker(){
        this.broker = new ZbusBroker().doing().me;
        return this;
    }

    public ZbusConsumer broker(Broker broker){
        this.broker = broker;
        return this;
    }

    public ZbusConsumer initQ(String mqName){
        this.mqName = mqName;
        log.info("创建Queue消费者(mq=" + mqName + ")");
        return this;
    }

    public ZbusConsumer initT(String mqName, String topic){
        this.mqName = mqName;
        this.topic = topic;
        log.info("创建Topic消费者(mq=" + mqName + ",topic=" + topic + ")");
        return this;
    }

    public ZbusConsumer hasGlobalMaxNum(boolean hasNum){
        this.hasGlobalMaxNum = hasNum;
        return this;
    }

    public Boolean hasGlobalMaxNum(){
        return (hasGlobalMaxNum != null)?hasGlobalMaxNum:false;
    }

    public Integer maxConsumerNum(){ //默认1个,暂时不支持空设
        return (consumer_max_num != null && consumer_max_num > 0)?consumer_max_num:1;
    }

    public ZbusConsumer maxConsumerNum(Integer consumerNum){
        this.consumer_max_num = consumerNum;
        return this;
    }


    /**
     * @ClassName: create
     * @Description: 创建一个Consumer
     * @since V1.0.0
     */
    public ZbusConsumer create(Broker broker, String mqName, Protocol.MqMode mqMode,Integer consumerNum){
        for (int i = 0; i < consumerNum; i++) {
            consumerMap.put(i,new Consumer(broker, mqName, mqMode));
            register(consumerMap);
        }
        log.info("创建消费者成功");
        return this;
    }

    public ZbusConsumer create(Broker broker, String mqName, Protocol.MqMode mqMode){
        return create(broker,mqName,mqMode,maxConsumerNum());
    }

    public ZbusConsumer create(Protocol.MqMode mqMode,Integer consumerNum){
        return create(broker,mqName,mqMode,consumerNum);
    }

    public ZbusConsumer create(Protocol.MqMode mqMode){
        return create(broker,mqName,mqMode);
    }

    public ZbusConsumer goQ(Map<Integer,Consumer> consumerMap, TMsgHandler<?> handler){
        if (consumerMap != null && handler != null){
            consumerMap.forEach((k,v)->{
                try {
                    v.onMessage(handler);//从handler中解析消息
                    v.start();//启动消费者监听器
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
        return this;
    }

    public ZbusConsumer goQ(TMsgHandler<?> handler){
        return goQ(consumerMap,handler);
    }

    public ZbusConsumer goT(Map<Integer,Consumer> consumerMap, String topic, TMsgHandler<?> handler){
        if (consumerMap != null && topic != null && handler != null){
            consumerMap.forEach((k,v)->{
                try {
                    v.setTopic(topic);
                    v.onMessage(handler);//从handler中解析消息
                    v.start();//启动消费者监听器
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
        return this;
    }

    public ZbusConsumer goT(TMsgHandler<?> handler){
        return goT(consumerMap,topic,handler);
    }

    //业务流程批处理
    public ZbusConsumer doingQ(Broker broker, String mqName, TMsgHandler<?> handler, Integer consumerNum){
        return broker(broker).initQ(mqName).maxConsumerNum(consumerNum)
                .create(Protocol.MqMode.MQ,consumerNum)
                .goQ(handler)
                .register();
    }

    public ZbusConsumer doingT(Broker broker, String mqName, String topic, TMsgHandler<?> handler, Integer consumerNum){
        return broker(broker).initT(mqName,topic).maxConsumerNum(consumerNum)
                .create(Protocol.MqMode.PubSub,consumerNum)
                .goT(handler)
                .register();
    }

    //创建Mq消费者
    //当注解绑定了consumer个数(局部),同时接口显式提供了consumerNum的初始化参数时(全局)
    //以全局参数为主,局部参数不生效
    public ZbusConsumer doingAllQ(Broker broker, Map<String, ZbusQueue.ReceiverQ> tMsgReceiverQMap,
                                  Boolean hasGlobalMaxNum, Integer consumerNum){
        tMsgReceiverQMap.forEach((k,v)->{
            Integer num = (hasGlobalMaxNum.equals(true))? consumerNum:v.maxConsumerNum;
            doingQ(broker,k,v.msgHandler,num);
        });
        return this;
    }

    public ZbusConsumer doingAllQ(Map<String, ZbusQueue.ReceiverQ> tMsgReceiverQMap,
                                  Boolean hasGlobalMaxNum, Integer consumerNum){
        return doingAllQ(broker,tMsgReceiverQMap,hasGlobalMaxNum,consumerNum);
    }

    public ZbusConsumer doingAllQ(Broker broker,Boolean hasGlobalMaxNum,Integer consumerNum){
        return doingAllQ(broker,tMsgReceiverQMap,hasGlobalMaxNum,consumerNum);
    }

    public ZbusConsumer doingAllQ(Boolean hasGlobalMaxNum,Integer consumerNum){
        return doingAllQ(tMsgReceiverQMap,hasGlobalMaxNum,consumerNum);
    }

    public ZbusConsumer doingAllQ(){
        return doingAllQ(hasGlobalMaxNum(),maxConsumerNum());
    }

    // 创建Topic消费者
    //当注解绑定了consumer个数(局部),同时接口显式提供了consumerNum的初始化参数时(全局)
    //以全局参数为主,局部参数不生效
    public ZbusConsumer doingAllT(Broker broker, Map<String, ZbusTopic.ReceiverT> tMsgReceiverTMap,
                                  Boolean hasGlobalMaxNum,Integer consumerNum){
        tMsgReceiverTMap.forEach((rk,rv)->{
            Integer num = (hasGlobalMaxNum.equals(true))? consumerNum:rv.maxConsumerNum;
            rv.tmc.forEach((k,v)->{
                doingT(broker,rk,k,v,num);
            });
        });
        return this;
    }

    public ZbusConsumer doingAllT(Map<String, ZbusTopic.ReceiverT> tMsgReceiverTMap,
                                  Boolean hasGlobalMaxNum,Integer consumerNum){
        return doingAllT(broker,tMsgReceiverTMap,hasGlobalMaxNum,consumerNum);
    }

    public ZbusConsumer doingAllT(Broker broker,Boolean hasGlobalMaxNum,Integer consumerNum){
        return doingAllT(broker,tMsgReceiverTMap,hasGlobalMaxNum,consumerNum);
    }

    public ZbusConsumer doingAllT(Boolean hasGlobalMaxNum,Integer consumerNum){
        return doingAllT(tMsgReceiverTMap,hasGlobalMaxNum,consumerNum);
    }

    public ZbusConsumer doingAllT(){
        return doingAllT(hasGlobalMaxNum(),maxConsumerNum());
    }

    // 关闭消费者
    public ZbusConsumer close(){
        consumerMapList.forEach(map->{
            map.forEach((k,v)->{
                try {
                    v.close();
                    v = null;
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            map.clear();
        });
        consumerMapList.clear();
        return this;
    }

    // 创建Mq消费者
    public static void createQConsumer(Broker broker,Map<String, ZbusQueue.ReceiverQ> tMsgReceiverQMap,
                                       Boolean hasGlobalMaxNum,Integer consumerNum){
        new ZbusConsumer().doingAllQ(broker,tMsgReceiverQMap,hasGlobalMaxNum,consumerNum);
    }

//    public static void createQConsumer(Broker broker,Map<String, TMsgHandler<?>> tMsgHandlerQMap,Integer consumerNum){
//        Map<String, ZbusQueue.ReceiverQ> receiverQMap = Maps.newConcurrentMap();
//        ZbusQueue zbusQueue = new ZbusQueue();
//        tMsgHandlerQMap.forEach((k,v)->{
//            receiverQMap.put(k,zbusQueue.init(k,1,1).receiverQ.register(1,v));
//        });
//        new ZbusConsumer().doingAllQ(broker,receiverQMap,consumerNum);
//    }

    // 创建Topic消费者
    public static void createTConsumer(Broker broker,Map<String, ZbusTopic.ReceiverT> tMsgReceiverTMap,
                                       Boolean hasGlobalMaxNum,Integer consumerNum){
        new ZbusConsumer().doingAllT(broker,tMsgReceiverTMap,hasGlobalMaxNum,consumerNum);
    }

//    public static void createTConsumer(Broker broker,Map<String, Map<String,TMsgHandler<?>>> tMsgHandlerTMap,Integer consumerNum){
//        Map<String, ZbusTopic.ReceiverT> receiverTMap = Maps.newConcurrentMap();
//        ZbusTopic zbusTopic = new ZbusTopic();
//        tMsgHandlerTMap.forEach((tk,tv)->{
//            tv.forEach((k,v)->{
//                receiverTMap.put(tk,zbusTopic.init(tk,k,1,1).receiverT.register(1,v));
//            });
//        });
//        new ZbusConsumer().doingAllT(broker,receiverTMap,consumerNum);
//    }

    /******************************/

    /**
     * 消费者列表
     */
    private final List<Map<Integer,Consumer>> consumerMapList = new ArrayList<>();

    public ZbusConsumer register(Map<Integer,Consumer> consumerMap){
        if (consumerMap != null) {
            consumerMapList.add(consumerMap);//注册这个消费者
        }
        return this;
    }

    public ZbusConsumer register(){
        return register(consumerMap);
    }

}
